package com.shujia.rec.dao;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

import com.shujia.util.Config;

public class KafkaDao {

    private KafkaProducer<String, String> producer;

    public KafkaDao() {

        /**
         *
         * 创建kafka生产者
         *
         */

        //建立连接

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", Config.getString("kafka.bootstrap.servers")); //kafka连接地址
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);

        //创建生产者
        producer = new KafkaProducer<String, String>(properties);

    }


    public void sendLog(String topic, String key, String value) {
        //构建一条消息  一行数据
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value);
        producer.send(producerRecord);
    }

}
